package com.flink_demo.demo.kafka;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Properties;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class KafkaStreamJdbc {
	public static final String ZOOKEEPER_HOST = "192.168.2.200:2181";
	public static final String KAFKA_BROKER = "192.168.2.200:9092";
	public static final String TRANSACTION_GROUP = "kafka_demo";

	public static void main(String... args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
		env.enableCheckpointing(10000);
		env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
		env.getConfig().setGlobalJobParameters(ParameterTool.fromArgs(args));
		RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend("file:///usr/local/soft/flink/flink-1.3.2/chkpt");
//		rocksDBStateBackend.
		env.setStateBackend(rocksDBStateBackend);
		// configure Kafka consumer
		Properties kafkaProps = new Properties();
		kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST);
		kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKER);
		kafkaProps.setProperty("group.id", TRANSACTION_GROUP);
//		SingleOutputStreamOperator<T>
		// topicd的名字是new，schema默认使用SimpleStringSchema()即可
		DataStreamSource<String> transaction = env
				.addSource(new FlinkKafkaConsumer010<String>("test-topic1", new SimpleStringSchema(), kafkaProps));
//		transaction
//		this.get
		transaction.map(new MapFunction<String, Integer>() {

			private static final long serialVersionUID = 1L;

			@Override
			public Integer map(String val) throws Exception {
				// TODO Auto-generated method stub
				return Integer.valueOf(val);
			}
		})
		.addSink(new MysqlSinkFunction()).name("MysqlSinkFunction");
//		.writeUsingOutputFormat(format)
		env.execute();
	}

	public static class MysqlSinkFunction extends RichSinkFunction {
		private static final long serialVersionUID = 1L;
		private Connection connection;
		private PreparedStatement ps;
		
		
		@Override
		public void open(Configuration parameters) throws Exception {
			// JDBC连接信息 
			String USERNAME = "root" ;
			String PASSWORD = "123456";
			String DBURL = "jdbc:mysql://192.168.2.200:3306/test";
			// 加载JDBC驱动
			Class.forName("com.mysql.jdbc.Driver");
			// 获取数据库连接
			connection = DriverManager.getConnection(DBURL, USERNAME, PASSWORD);
			String sql = "insert into test(id) values (?)";
			ps = connection.prepareStatement(sql);
			super.open(parameters);
		}

		/* (non-Javadoc)
		 * @see org.apache.flink.streaming.api.functions.sink.RichSinkFunction#invoke(java.lang.Object)
		 * 当发生异常时，会自动重启flink程序，继续处理
		 */
		@Override
		public void invoke(Object value) throws Exception {
//			System.out.println("current:" + value);
//			throw new Exception("1234");
			ps.setObject(1, value);
			ps.executeUpdate();
		}

		/** * close()是tear down的方法，在销毁时执行，关闭连接。 */
		@Override
		public void close() throws Exception {
			if (ps != null) {
				ps.close();
			}
			if (connection != null) {
				connection.close();
			}
			super.close();
		}

	}
}
